package tcpProxy

import (
	"encoding/json"
	"github.com/rs/xid"
	"github.com/wumingyu12/wensMqtt/common"
	"github.com/wumingyu12/wensMqtt/model"
	"log"
	"math/rand"
	"net"
	"strconv"
	"strings"
	"time"
)

type TcpMqttProxy struct {
	base *model.BaseLink
}

func newonfig() model.Config {
	var myrand = func() string {
		r := rand.New(rand.NewSource(time.Now().UnixNano()))
		return strconv.Itoa(r.Intn(100000))
	}
	conf := model.Config{
		PassWord:       "12345678",
		UserName:       "resetfulapi",
		ClientId:       "resetfulapi-" + myrand(),
		BrokerUrl:      "tcp://wmqtt.wens.com.cn:1883",
		IsCleanSession: true, //一定要告诉mqtt服务器清掉session否则会堆积
	}
	return conf
}

func (self *TcpMqttProxy) SetUp() error {
	self.base = new(model.BaseLink)
	err := self.base.SetUp(newonfig()) //配置mqtt连接参数
	if err != nil {
		return err
	}
	err = self.base.link()
	if err != nil {
		return err
	}
	log.Println("mqtt 连接成功")
	return nil
}

type mqttConn struct {
	ReadTopic  string
	WriteTopic string
	HasConnet  bool //表示是否已经连接到远方的x端口
}

//clientId代表mqtt中的clientid
func (self *TcpMqttProxy) Link(clientId string, localport int, remotePort int) {
	ip := "127.0.0.1:" + strconv.Itoa(localport)
	//建立本机的tcp连接
	log.Println(clientId, localport, remotePort)
	lis, err := net.Listen("tcp", ip)
	if err != nil {
		log.Println(err)
		return
	}
	defer lis.Close()
	//监听连接请求
	for {
		conn, err := lis.Accept()
		if err != nil {
			log.Printf("建立连接错误:%v\n", err)
			continue
		}
		log.Println(conn.RemoteAddr(), conn.LocalAddr())
		ch := make(chan string, 3)
		//go handle(conn, ch)
		go self.connectMqtt(conn, ch, clientId, remotePort)
	}
}

func (self *TcpMqttProxy) connectMqtt(sconn net.Conn, ch chan string, clientId string, remotePort int) {
	//发送连接请求
	tcpctrlTopic := strings.Replace(common.Const_tcpbridgectrl, "{{clientId}}", clientId, -1)
	sessionId := xid.New().String()
	//需要建立tcp连接
	request := common.TcpLinkControl{
		Type:      common.Const_TcpBridge_Establish,
		SessionId: sessionId,
		LinkPort:  remotePort,
	}
	reqJs, _ := json.Marshal(request)
	self.base.MqttPub(tcpctrlTopic, reqJs)
	log.Println("请求连接", tcpctrlTopic, reqJs)

	mqttc := &mqttConn{}
	writeTopic := strings.Replace(common.Const_tcpsub, "{{clientId}}", clientId, -1)
	writeTopic = strings.Replace(writeTopic, "{{sessionId}}", sessionId, -1)
	mqttc.WriteTopic = writeTopic
	mqttc.ReadTopic = writeTopic + common.Const_resptail
	//如果等待5秒还没有建立连接就关闭
	go func() {
		time.Sleep(3 * time.Second)
		if mqttc.HasConnet == false {
			log.Println("等待3秒对方还没建立连接，主动断开")
			sconn.Close()
		}
	}()
	start_send := func() {
		log.Println("开始转发")
		defer sconn.Close()
		defer func() {
			//连接中断了,请求远端同样关闭连接
			mrequest := common.TcpLinkControl{
				Type:      common.Const_TcpBridge_Close,
				SessionId: sessionId,
				LinkPort:  remotePort,
			}
			mreqJs, _ := json.Marshal(mrequest)
			self.base.MqttPub(tcpctrlTopic, mreqJs)
			log.Println("请求关闭连接", tcpctrlTopic, mreqJs)
		}()
		b := make([]byte, 1024)
		for {
			n, err := sconn.Read(b)
			if err != nil {
				log.Println(err)
				return
			} else {
				sb := make([]byte, n)
				copy(sb, b)
				log.Println("发送", string(sb), "到", mqttc.WriteTopic)
				self.base.MqttPub(mqttc.WriteTopic, sb)
			}
		}
	}

	self.base.MqttSub(mqttc.ReadTopic, func(topic string, msg []byte) {
		if !mqttc.HasConnet { //还没建立好连接
			if string(msg) == common.Const_tcpHasEstablish { //第一次接收到success代表那边已经和22端口成功建立连接
				//远端建立连接成功
				log.Println("远端建立连接成功")
				mqttc.HasConnet = true
				go start_send() //开始可以发送
			} else { //那边和22端口建立连接失败

			}
		} else { //正常情况下就直接无脑转发
			log.Println("接收到", string(msg))
			sconn.Write(msg)
		}
	})
}
